-
Notifications
You must be signed in to change notification settings - Fork 135
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add max_bandwidth option #101
Conversation
pass | ||
|
||
|
||
class TimeUtils(object): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any reason we're not using the time module directly? Just to restrict the interface?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is so you inject your own implementations. We do this a lot in s3transfer
with OSUtils
and this pattern is done a lot with chalice
. If you didn't, you would have to patch a bunch of stuff to affect the behavior of time
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What I mean is we don't need a wrapper class to do this. time_utils=time
would also work as our restraint is that the thing passed has a time
and sleep
callable. The only benefit I see is that there is an example of what interface the time_module
needs to meet.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we ever need anything more complicated in the future, this will allow us to have it. OSUtils has things like that which aren't direct mappings.
s3transfer/bandwidth.py
Outdated
projected_rate = self._rate_tracker.get_projected_rate(amt, time_now) | ||
return projected_rate > self._max_rate | ||
|
||
def _consume_for_scheduled_consumption(self, amt, request_token, time_now): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some of these private function names are a little confusing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
s3transfer/bandwidth.py
Outdated
class BandwidthRateTracker(object): | ||
def __init__(self, a=0.8): | ||
"""Tracks the rate of bandwidth consumption""" | ||
self._a = a |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have no idea what this variable is for.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah can you rename this alpha
or something more description? I wouldn't expect everyone to know the vars used for EMA formulas.
s3transfer/bandwidth.py
Outdated
def _calculate_rate(self, amt, time_at_consumption): | ||
return amt/(time_at_consumption - self._last_time) | ||
|
||
def _calculate_ema_rate(self, amt, time_at_consumption): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What does ema mean?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
exponential moving average. I can see if I can rename it or add a comment.
# This is just a smoke test to make sure that the limiter is | ||
# being used and not necessary its exactness. So we set the maximum | ||
# bandwidth to len(content)/2 per sec and make sure that it is | ||
# noticeably slower. Ideally it will take more than two seconds, but |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fix up grammar/spelling here.
Looking at this now, but would you mind adding a more detailed commit message? This is a pretty big feature and I think it's worth more of an explanation. |
return self._fileobj.read(amount) | ||
|
||
self._consume_through_leaky_bucket() | ||
return self._fileobj.read(amount) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is going to result in very bursty reads I think in cases where someone is passing a large number. I don't think there's a great way around this besides allowing partial reads which I think would be a change in the contract. However it might make sense to at least document that large numbers passed to amount might reduce the effectiveness of bandwidth limiting.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep that is true if they were passing any a large number. The good news is they really do not have control over the value passed into the read()
for this abstraction. For downloads, the internals of s3transfer
will always read 256KB when streaming data down, and for uploads httplib
uses 8KB (which we don't even have control over). So given that the burstiness will be pretty small if at all.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think more worrisome for me is the fact that this seems to rely on multiple threads to smooth over the burstiness. The less threads there are the more bursting you get. This also appears to throw off the bandwidth calculations. I'm not quite sure why this happens, but it's really pronounced if you set the bandwidth to something high and the max concurrent requests to 1. In one test, I had 100MB+ bandwidth available, and with one thread and max_bandwidth
to something really high (order of GB/s) I was only getting about 15MB/s.
Any insight on what's going on here? I haven't had a whole lot of time to dig into this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried looking into it. I was running on a machine where with default configuration's of 10 threads I was getting 100MB+, but I was never able to get a single thread (with bandwidth enabled or not enabled) to ever reach that. It was seeing speeds of ~25 MB with bandwidth turned off but turning on to something high resulted in the same speeds. I saw this for both uploads and downloads. I want to see it on your environment though.
As to the bandwidth calculations, those are just bursty in general because we aggregate the amounts we report back so if only one thread is working it will report in increments of that threshold. When there is more threads reporting it smoothes the progress updates more because there are more threads forcing the the progress to update. So that may explain that? If the threshold is low and you have one thread working, I can see the progress even more bursty because there is only thread reporting and it has to do sleeps.
s3transfer/bandwidth.py
Outdated
|
||
class RequestExceededException(Exception): | ||
def __init__(self, requested_amt, retry_time): | ||
"""Error |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This isn't very helpful
pass | ||
|
||
|
||
class TimeUtils(object): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we ever need anything more complicated in the future, this will allow us to have it. OSUtils has things like that which aren't direct mappings.
s3transfer/bandwidth.py
Outdated
""" | ||
self._token_bucket = token_bucket | ||
self._time_utils = time_utils | ||
if not time_utils: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
time_utils is not None
s3transfer/bandwidth.py
Outdated
self._leaky_bucket = leaky_bucket | ||
self._transfer_coordinator = transfer_coordinator | ||
self._time_utils = time_utils | ||
if not time_utils: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
time_utils is not None
s3transfer/bandwidth.py
Outdated
self._bytes_seen = 0 | ||
self._bytes_threshold = bytes_threshold | ||
|
||
def enable(self): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about something like enable_bandwidth_limiting
(and similar for disabled)? This makes it seem like you're enabling / disabling the object itself rather than the limiting feature.
s3transfer/bandwidth.py
Outdated
# We do not want to be calling consume on every read as the read | ||
# amounts can be small causing the lock of the leaky bucket to | ||
# introduce noticeable overhead. So instead we keep track of | ||
# how many bytes we seen and only call consume once we pass a |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we have seen
s3transfer/bandwidth.py
Outdated
projected_rate = self._rate_tracker.get_projected_rate(amt, time_now) | ||
return projected_rate > self._max_rate | ||
|
||
def _consume_for_scheduled_consumption(self, amt, request_token, time_now): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good, I had a few small comments.
Do we ever debug log the fact that we're throttling bandwidth (and what the max bandwidth value is)?
s3transfer/bandwidth.py
Outdated
"""Signal that data being read is being transferred to S3""" | ||
self.enable() | ||
if hasattr(self._fileobj, 'signal_transferring'): | ||
self._fileobj.signal_transferring() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This whole signal_transferring
thing is really confusing, can you explain this better? Also, why is the hasattr
check here? The readfilechunk I get, but I don't follow when self._fileobj
would ever have a signal_transferring
method for this class. It at least deserves a comment. I'm also not a fan of these hasattr()
style checks and I'd prefer we not continue this pattern going forward (it makes it less clear what the expected interfaces are suppose to be).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I explained the signal transferring portion in the comment.
As to the hasattr
check that is because of a whole other issue. The issue boils down to we need one shared event handler with no shared state that can indicate to a stream that the files is being read for the specific reason of being transferred to S3. We need one shared handler because one handler per request really bogs down the system. We need no share state because the handler could be invoked by separate threads.
So to achieve this, the current strategy is to check for a method on the stream and call it in the handlers to enable/disable transfer reading functionality. This is what we had in place for callbacks. However if the stream is wrapped multiple times and two of these wrappers require this enabling/disabling functionality, before only the top level wrapper would have its method called and the wrapper underneath would not have its method called. So instead the top level wrapper needs to be able to signal to the wrapper underneath that it was signaled to enable/disable whatever functionality it needs to enable/disable and so it will need to as well. So I tried to generalize all of this with the signal_transferring()
and signal_not_transferring()
methods. I'm up for suggestions on how to improve this.
self._client.meta.events.register_last( | ||
event_name, enable_upload_callbacks, | ||
unique_id='s3upload-callback-enable') | ||
event_name, signal_transferring, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How is the last event handler on a request-created
event indicative that we're transferring? Does this just mean "uploads can now happen from this point on?"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not a big fan of it as well, but essentially what it suppose to encompass is to indicate that the stream is being read because it is being transferred. This is needed because the stream for uploads can be read for the following reasons:
- It is being read and uploaded to S3
- It is being read for content MD5
- It is being read in memory to calculate the SHA256 for a sigv4 request.
The problem is that when you have functionality that is specific to the file stream being read from for the specific purpose of uploading to s3 (i.e. transfer callbacks and transfer bandwidth limiting), it is ambiguous on why the read is happening. We handle the MD5 point to start because these are disabled to start. However, the SHA256 portion is tricky because the signing happens during the request creation and request-created
is the last event we have before sending the request over the wire. To solve it, we straddle the handler that does the sha256 calculation by disabling to start and enable it back at the very end. Hence the meaning of signal_not_transferring()
comes to be "even though the stream is being read from, it is not actually being sent to S3 and the meaning of signal_transferring()
means "if the stream is being read from it is because it is being read from in transferring the data".
I would be up for better names if that is confusing, but I am not sure if there is a better mechanism to use without exposing something new in botocore. Not sure if you have any thoughts?
s3transfer/bandwidth.py
Outdated
class BandwidthRateTracker(object): | ||
def __init__(self, a=0.8): | ||
"""Tracks the rate of bandwidth consumption""" | ||
self._a = a |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah can you rename this alpha
or something more description? I wouldn't expect everyone to know the vars used for EMA formulas.
s3transfer/bandwidth.py
Outdated
self._last_time = time_at_consumption | ||
|
||
def _calculate_rate(self, amt, time_at_consumption): | ||
return amt/(time_at_consumption - self._last_time) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
spacing around the amt
return self._fileobj.read(amount) | ||
|
||
self._consume_through_leaky_bucket() | ||
return self._fileobj.read(amount) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think more worrisome for me is the fact that this seems to rely on multiple threads to smooth over the burstiness. The less threads there are the more bursting you get. This also appears to throw off the bandwidth calculations. I'm not quite sure why this happens, but it's really pronounced if you set the bandwidth to something high and the max concurrent requests to 1. In one test, I had 100MB+ bandwidth available, and with one thread and max_bandwidth
to something really high (order of GB/s) I was only getting about 15MB/s.
Any insight on what's going on here? I haven't had a whole lot of time to dig into this.
@JordonPhillips I think I got all of your comments @joguSD I addressed all of your comments. Did not change the @dstufft I put a comment about what you were talking about for large transfers. Still looking at @jamesls brought up. Let me know if there was something specific you were thinking? @jamesls I am still working at your stuff, but yeah I will add a better commit. I'm probably going to squash all of these and make a better commit before it gets merged in. |
Also as to debug logging. I do not have any set currently. To track retries and consumption amount I had an abstraction that I could query before, but I yanked that from the original review. How does these logs sound?:
|
392dd28
to
195b10a
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Capturing offline feedback:
We tried to repro the degradation of single threaded performance offline but weren't able to come up with anything. Not sure what the root cause was but likely a mistake on my end. Given that, I think everything looks good to me.
s3transfer/manager.py
Outdated
if self._config.max_bandwidth is not None: | ||
logging.debug( | ||
'Setting max_bandwidth to %s', self._config.max_bandwidth) | ||
token_bucket = LeakyBucket(self._config.max_bandwidth) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/token_bucket/leaky_bucket/g
This limits the rate in which uploads and downloads can stream content to and from S3. The abstraction uses a leaky bucket to control bandwidth consumption.
This was not seen in real life transfers, but for the functional tests, the time deltas between reads were infitesimally small causing the rate to be calculate with a time of 0 and throw a ZeroDivisionError
cc4a5ba
to
94b402d
Compare
Allows user to configure the max bandwidth consumption for the streaming of data for uploads and downloads. The value is set in terms of bytes per second. I would recommend trying it out with the CLI PR that I am planning to send up soon.